package com.godenwater.recv.service;

import com.godenwater.core.spring.Application;
import com.godenwater.core.spring.BaseDao;
import com.godenwater.recv.server.summit.SummitMessage;
import com.godenwater.recv.server.summit.SummitMessage96;
import com.godenwater.recv.server.summit.SummitParser;
import com.godenwater.recv.server.summit.SummitParser96;
import com.godenwater.yanyu.utils.ByteUtil;
import com.godenwater.utils.ByteUtils;
import com.godenwater.web.manager.StationDataManager;
import com.godenwater.web.manager.StationManager;
import com.godenwater.web.rtu.model.RtuStationData;
import com.godenwater.web.rtu.model.RtuStationModel;
import org.apache.commons.lang3.StringUtils;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigDecimal;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.UUID;

/**
 * 1.1【亿立能】报文消费者
 *
 * @author lipujun
 * @ClassName: MessageConsumer
 * @Description: 通过线程的方式将“报文”解析，然后入库，这是消息进行处理的第一个环节
 * <p>
 * 注意：只启动一个消费者
 * @date Mar 14, 2013
 */
public class MessageSummitConsumer extends AbstractMessageConsumer implements
        Runnable {

    private static Logger logger = LoggerFactory
            .getLogger(MessageSummitConsumer.class);

    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    SimpleDateFormat sdf2 = new SimpleDateFormat("yyMMddHHmmss");
    SimpleDateFormat sdf3 = new SimpleDateFormat("yyMMdd");

    private static long DAY = 24 * 60 * 60 * 1000;

    private static long HOUR = 60 * 60 * 1000;

    private static String KEY = "messages";

    private BaseDao dao = (BaseDao) Application.getInstance()
            .getBean("baseDao");

    public MessageSummitConsumer() {

    }

    @Override
    public void run() {
        int i = 0;

    }

    private void processSummit96(SummitMessage96 sm, String channel, String message, boolean crcFlag) throws Exception {
        if (sm != null) {
            logger.info(">> stcd " + sm.getStcd() + " viewdate " + sm.getViewdate());
            String stcd = sm.getStcd();
            stcd = StringUtils.leftPad(stcd, 4, "0");
            Date viewDate = sdf3.parse(sm.getViewdate());
            try {
                String id = UUID.randomUUID().toString();
                this.writeRtuMessage(id, stcd, "SUMMIT", channel, ByteUtil.toHexString(sm.getFunccode()), 1, sdf3.parse(sm.getViewdate()), 1, 1, message, crcFlag, "", "", 0, 0);
                logger.info(">>西安山脉原始报文开始入库成功！id=" + id);
            } catch (Exception e) {
                e.printStackTrace();
            }
            //入库五分钟雨量和水位数据，五分钟数据大于零才入库

            RtuStationModel station = StationManager.getInstance().getStation(stcd);
            if (station == null) {
                logger.info(">>未配置测站信息，请配置RTU站码为 " + stcd + " 的国家站码!");
            }
            if (station != null) {
                String stcd8 = station.getStcd8();
                DateTime dt = new DateTime(viewDate);
                byte[] viewData = sm.getViewdata();
                BigDecimal baseValue = station.getDtmel();
                if (baseValue == null) {
                    baseValue = new BigDecimal("0");
                }
                //一共24个小时，每个小时12条五分钟 数据的记录
                int index = 0;
                for (int i = 0; i < 12 * 24; i++) {
                    byte[] tempData = new byte[3];//临时数据集合
                    System.arraycopy(viewData, index, tempData, 0, tempData.length);
                    dt = dt.plusMinutes(5);//添加5分钟标记
                    //五分钟雨量大于零才入库

                    BigDecimal rain = new BigDecimal(ByteUtil.toUnsigned(tempData[0]));
                    double rainVal = rain.multiply(new BigDecimal("0.1")).doubleValue();
                    if (rainVal > 0) {
                        writeYYConnvertData(stcd8, dt.toDate(), "1", 23, rainVal, 41);
                    }
                    //五分钟水位入库
                    //  BigDecimal bd = new BigDecimal(tempData[0]);
                    byte[] zByte = new byte[]{tempData[2], tempData[1]};

                    BigDecimal bz = new BigDecimal(ByteUtil.bytesToUshort(zByte));
                    BigDecimal z = bz.multiply(new BigDecimal("0.01"));
                    logger.debug(">>时间=" + dt.toString("yyyy-MM-dd HH:mm:ss") + " 雨量值=" + rainVal + " 水位值：" + " = " + z + baseValue + " 序号" + i + 1);
                    writeYYConnvertData(stcd8, dt.toDate(), "1", 11, baseValue.add(z).doubleValue(), 41);
                    index = index + 3;
                }
            }
        }
    }

    private void processSummit(SummitMessage sm, String channel, String message, boolean crcFlag) throws Exception {
        {
            if (sm != null) {
                logger.info(">> stcd " + sm.getStcd() + " viewdate " + sm.getViewdate());
                logger.info(">> 温度 " + sm.getTempature());
                logger.info(">> 电压1 " + sm.getVoltage1());
                logger.info(">> 电压2 " + sm.getVoltage2());
                //1、 写入原始报文库
                Date viewDate = sdf2.parse(sm.getViewdate());
                String stcd = sm.getStcd();
                stcd = StringUtils.leftPad(stcd, 4, "0");
                try {
                    String id = UUID.randomUUID().toString();
                    this.writeRtuMessage(id, stcd, "SUMMIT", channel,
                            ByteUtil.toHexString(sm.getFunccode()), 1, sdf2.parse(sm.getViewdate()), 1, 1, message,
                            crcFlag, "", "", 0, 0);
                    logger.info(">>原始报文開始入庫成功！stcd=" + stcd);

                } catch (Exception e) {
                    e.printStackTrace();
                }
                //------------------------------------

                RtuStationModel station = StationManager.getInstance().getStation(stcd);

                if (station == null) {
                    logger.info(">>未配置测站信息，请配置RTU站码为 " + stcd + " 的国家站码!");
                }

                if (station != null) {
                    String stcd8 = station.getStcd8();

                    //定时上报0x80
                    logger.info(">>开始处理报文类型为 " + ByteUtils.bytesToHexString(new byte[]{sm.getFunccode()}) + " 的报文数据!");
                    if (sm.getFunccode() == (byte) 0x80) {
                        //水位数据0xB5 雨量数据0xBB 流量数据0xB6

                        DateTime dt = new DateTime(viewDate);
                        dt = dt.minusHours(1);//减去1小时，作为时间起点
                        //雨量
                        if (sm.getType() == (byte) 0xBB || sm.getType() == (byte) 0xBE) {
                            byte[] rain = sm.getContent();
                            logger.info(">> 雨量定时报 " + rain.length);
                            int[] rv = SummitParser.parseRain(rain);
                            //---------------------------
                            RtuStationData oldData = StationDataManager.getInstance().getStation(stcd8);
                            if (oldData == null) {
                                RtuStationData newData = new RtuStationData();
                                newData.setStcd(stcd8);
                                newData.setRain5(new BigDecimal(rv[0]).multiply(new BigDecimal("0.1")).toString());
                                StationDataManager.getInstance().update(newData);
                                oldData = newData;
                            }
                            //------------------------
                            BigDecimal total = new BigDecimal(0);
                            BigDecimal oldval = new BigDecimal(oldData.getRain5());
                            for (int i = 0; i < 12; i++) {
                                dt = dt.plusMinutes(5);//添加5分钟标记
                                // 燕禹68字节协议中，记录的是翻斗的次数，一次0.5mm,需要将此数据翻转
                                BigDecimal bd = new BigDecimal(rv[i]);
                                BigDecimal newval = bd.multiply(new BigDecimal("0.1"));

                                double val = newval.subtract(oldval).doubleValue();
                                if (val > 0) {
                                    writeYYConnvertData(stcd8, dt.toDate(), "1", 23, val, 41);
                                    logger.info(">> 雨量值入库，分钟数据 " + i + " = " + val);//一位小数
                                }
                                total = total.add(newval.subtract(oldval));

                                oldval = newval;
                            }
                            writeYYConnvertData(stcd8, dt.toDate(), "1", 22, Double.parseDouble(total.toString()), 41);//
                            logger.info(">> 雨量值入库，小时数据  = " + total.toString());//一位小数
                            oldData.setRain5(new BigDecimal(rv[11]).multiply(new BigDecimal("0.1")).toString());
                            StationDataManager.getInstance().update(oldData);
                        }

                        //水位
                        if (sm.getType() == (byte) 0xB5 || sm.getType() == (byte) 0xBD) {
                            byte[] river = sm.getContent();
                            logger.info(">> 水位定时报 " + river.length);
                            float baseValue = SummitParser.parseRiverBaseValue(river);
                            logger.info(">> 水位基值 " + baseValue + " = " + baseValue);


                            int[] rv = SummitParser.parseRiver(river);
                            //float[] rv = SummitParser.parseBcdRiver(river);
                            for (int i = 0; i < 12; i++) {
                                dt = dt.plusMinutes(5);//添加5分钟标记
                                BigDecimal bd = new BigDecimal(rv[i]);
                                BigDecimal newval = bd.multiply(new BigDecimal("0.01"));

                                logger.info(">> 水位值 " + i + " = " + newval.add(new BigDecimal(baseValue)));
                                writeYYConnvertData(stcd8, dt.toDate(), "1", 11, newval.add(new BigDecimal(baseValue)).setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue(), 41);
                            }

                        }

                        //流量数
                        if (sm.getType() == (byte) 0xB6) {

                        }
                    }

                    //阈值加报0x86
                    if (sm.getFunccode() == (byte) 0x86) {
                        //雨量，以BCD码？
                        if (sm.getType() == (byte) 0xBB || sm.getType() == (byte) 0xBE) {
                            logger.info(">> 雨量加报 ");
                            byte[] rain = sm.getContent();
                            //累计雨量
                            byte[] rainTotal = new byte[4];
                            System.arraycopy(rain, 0, rainTotal, 0, 4);
                            logger.info(">>" + ByteUtil.byte2Str(new byte[]{rainTotal[3], rainTotal[2], rainTotal[1], rainTotal[0]}));
                            //今日雨量
                            byte[] rainDay = new byte[3];
                            System.arraycopy(rain, 4, rainDay, 0, 3);
                            ByteUtil.byte2Str(new byte[]{rainTotal[2], rainTotal[1], rainTotal[0]});
                            //问题，这个数据是需要与上次的数据相减么？？？

                        }

                        //水位
                        if (sm.getType() == (byte) 0xB5 || sm.getType() == (byte) 0xBD) {
                            logger.info(">> 水位加报 " + sm.getContent().length);
                            byte[] river = sm.getContent();


                        }

                        //流量数
                        if (sm.getType() == (byte) 0xB6) {

                        }
                    }

                }
            }
        }
    }

    /**
     * 处理业务逻辑
     * <p>
     * 第一步：原始报文入库
     * <p>
     * 第二步：判断单报、多报。单报写“入库队列”；多报写文件，并根据最后一条报文写“合并队列”
     *
     * @param message
     */

    public void process(String channel, String message) throws Exception {
        logger.info("原始报文開始入庫！");
        boolean crcFlag = true;
        String func = message.substring(6, 8);
        String endStr = message.substring(message.length() - 2, message.length());
        if (func.equals("96")) {
            if (endStr.equals("16")) {
                logger.info("山脉召测数据上来了！");
                SummitMessage96 summitMessage96 = SummitParser96.parse(ByteUtil.HexStringToBinary(message));
                processSummit96(summitMessage96, channel, message, crcFlag);
            }
        } else {
            // 解析报文体
            SummitMessage sm = SummitParser.parse(ByteUtil.HexStringToBinary(message));
            processSummit(sm, channel, message, crcFlag);
        }
    }


    public static void main(String[] arg) {

        MessageSummitConsumer consumer = new MessageSummitConsumer();

        //rain hour
        String message = "683168BE5656501440800100000000000000000000000000000000000000000000000000000000092002144321105812000000002116";
        try {
            //river hour
            message = "68 35 68 BD 56 56 50 14 40 80 01 10 27 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 09 20 02 14 43 21 10 58 12 00 00 00 00 7C 16 ";
            //message ="68 20 68 BB 12 34 15 03 40 86 01 35 02 00 00 35 00 00 00 00 41 15 15 12 03 12 68 25 10 35 12 00 00 70 00 F3 16";

            //加报
            //message = "68 1C 68 B5 56 56 50 14 40 86 01 00 70 11 00 00 35 30 13 20 02 14 75 25 10 64 12 00 00 00 00 D7 16 ";
            message = "682068bb42134c004086010065000015010000001405202306175029101713000000000316";
            //message = "68 31 68 BE 42 13 1B 00 40 80 01 B7 D4 B7 D4 B7 D4 B7 D4 B7 D4 B7 D4 B7 D4 B7 D4 B7 D4 B7 D4 B7 D4 B7 D4 00 00 00 00 16 07 06 17 93 35 10 20 13 00 00 00 00 01 16";
            message = StringUtils.replace(message, " ", "");
            consumer.process("test", message);

        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}
